package com.bw.gmall.realtime.Day0919;

import com.bw.gmall.realtime.Day0911.MyKafkaUtil;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class KafkaTest {
    public static void main(String[] args) throws Exception {
        // 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(1);


        String kafkaDDL = MyKafkaUtil.getKafkaDDL("topic_log", "xxx");
        System.out.println(kafkaDDL);
        DataStreamSource<String> kfkDs = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("topic_log", "xxx").setStartFromEarliest());
        System.out.println("<=============================Kafka日志数据=============================>");
        kfkDs.print();




        env.execute();
    }
}
